package com.hery.demo;

import com.hery.driver.JsonAlertJobDriver;
import com.hery.functions.AlertManagerSinkFunction;
import com.hery.functions.SourceFunctions;
import com.hery.pojo.KafkaAlertLog;
import com.hery.pojo.Rule;
import com.hery.utils.Config;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class AlertManagerSinkDemo {
    public static void main(String[] args) throws Exception {

        // 全局配置
        Configuration config = Config.initApplicationConfig(args, JsonAlertJobDriver.class);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> source = env.addSource(SourceFunctions.getKafkaEventSource());


        // 发送告警信息到 AlertManager
        source
                .map(json ->{
                    //String json = "{\"common_data\":{\"appPackage\":\"ltd.qisi.sotasupportapp\",\"appVersion\":\"3.03.01.000\",\"collectedTime\":1625240289781,\"behaviorId\":\"50026003\",\"qisiuiVersion\":\"0.2.02\",\"uid\":\"1924427992\",\"regionCode\":\"659001\",\"eventName\":\"mock\",\"vin\":\"MOCK1TELWMOMZRQAWO\",\"hardwareVersion\":\"3.03.01.000\",\"carseries\":\"E115\",\"pdsn\":\"47556519116431\",\"displayId\":\"0\"},\"gather_data\":{\"key1\":\"value5\",\"key2\":\"69\",\"key3\":\"0\"}}\n";
                    return new KafkaAlertLog(json, new Rule());
                })
                .addSink(new AlertManagerSinkFunction(config));

        source.print("source");

        env.execute("altet manager sink job");
    }
}
